/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hive.kafka;

import com.google.common.base.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * Kafka Producer with public methods to extract the producer state then resuming transaction in another process.
 * This Producer is to be used only if you need to extract the transaction state and resume it from a different process.
 * Class is mostly taken from Apache Flink Project:
 * org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer
 *
 * @param <K> key serializer class.
 * @param <V> value serializer class.
 */
class HiveKafkaProducer<K, V> implements Producer<K, V> {
  private static final Logger LOG = LoggerFactory.getLogger(HiveKafkaProducer.class);

  private final KafkaProducer<K, V> kafkaProducer;

  @Nullable private final String transactionalId;

  HiveKafkaProducer(Properties properties) {
    transactionalId = properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
    kafkaProducer = new KafkaProducer<>(properties);
  }

  @Override public void initTransactions() {
    kafkaProducer.initTransactions();
  }

  @Override public void beginTransaction() throws ProducerFencedException {
    kafkaProducer.beginTransaction();
  }

  @Override public void commitTransaction() throws ProducerFencedException {
    kafkaProducer.commitTransaction();
  }

  @Override public void abortTransaction() throws ProducerFencedException {
    kafkaProducer.abortTransaction();
  }

  @Override public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
      throws ProducerFencedException {
    kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
  }

  @Override public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata)
      throws ProducerFencedException {
    kafkaProducer.sendOffsetsToTransaction(offsets, groupMetadata);
  }

  @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return kafkaProducer.send(record);
  }

  @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    return kafkaProducer.send(record, callback);
  }

  @Override public List<PartitionInfo> partitionsFor(String topic) {
    return kafkaProducer.partitionsFor(topic);
  }

  @Override public Map<MetricName, ? extends Metric> metrics() {
    return kafkaProducer.metrics();
  }

  @Override public void close() {
    kafkaProducer.close();
  }

  @Override public void close(Duration duration) {
    kafkaProducer.close(duration);
  }

  @Override public void flush() {
    kafkaProducer.flush();
    if (transactionalId != null) {
      flushNewPartitions();
    }
  }

  /**
   * Instead of obtaining producerId and epoch from the transaction coordinator, re-use previously obtained ones,
   * so that we can resume transaction after a restart. Implementation of this method is based on
   * {@link org.apache.kafka.clients.producer.KafkaProducer#initTransactions}.
   */
  synchronized void resumeTransaction(long producerId, short epoch) {
    Preconditions.checkState(producerId >= 0 && epoch >= 0,
        "Incorrect values for producerId {} and epoch {}",
        producerId,
        epoch);
    LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch);

    Object transactionManager = getValue(kafkaProducer, "transactionManager");

    Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper");
    invoke(transactionManager,
        "transitionTo",
        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
    invoke(topicPartitionBookkeeper, "reset");
    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
    setValue(producerIdAndEpoch, "producerId", producerId);
    setValue(producerIdAndEpoch, "epoch", epoch);

    invoke(transactionManager,
        "transitionTo",
        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.READY"));

    invoke(transactionManager,
        "transitionTo",
        getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.IN_TRANSACTION"));
    setValue(transactionManager, "transactionStarted", true);
  }

  @Nullable String getTransactionalId() {
    return transactionalId;
  }

  long getProducerId() {
    Object transactionManager = getValue(kafkaProducer, "transactionManager");
    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
    return (long) getValue(producerIdAndEpoch, "producerId");
  }

  short getEpoch() {
    Object transactionManager = getValue(kafkaProducer, "transactionManager");
    Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch");
    return (short) getValue(producerIdAndEpoch, "epoch");
  }

  /**
   * Besides committing {@link org.apache.kafka.clients.producer.KafkaProducer#commitTransaction} is also adding new
   * partitions to the transaction. flushNewPartitions method is moving this logic to pre-commit/flush, to make
   * resumeTransaction simpler.
   * Otherwise resumeTransaction would require to restore state of the not yet added/"in-flight" partitions.
   */
  private void flushNewPartitions() {
    LOG.info("Flushing new partitions");
    TransactionalRequestResult result = enqueueNewPartitions();
    Object sender = getValue(kafkaProducer, "sender");
    invoke(sender, "wakeup");
    result.await();
  }

  private synchronized TransactionalRequestResult enqueueNewPartitions() {
    Object transactionManager = getValue(kafkaProducer, "transactionManager");
    Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
    invoke(transactionManager,
        "enqueueRequest",
        new Class[] {txnRequestHandler.getClass().getSuperclass()},
        new Object[] {txnRequestHandler});
    return (TransactionalRequestResult) getValue(txnRequestHandler,
        txnRequestHandler.getClass().getSuperclass(),
        "result");
  }

  @SuppressWarnings("unchecked") private static Enum<?> getEnum(String enumFullName) {
    @SuppressWarnings("RegExpRedundantEscape") String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
    if (x.length == 2) {
      String enumClassName = x[0];
      String enumName = x[1];
      try {
        Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
        return Enum.valueOf(cl, enumName);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("Incompatible KafkaProducer version", e);
      }
    }
    return null;
  }

  private static Object invoke(Object object, String methodName, Object... args) {
    Class<?>[] argTypes = new Class[args.length];
    for (int i = 0; i < args.length; i++) {
      argTypes[i] = args[i].getClass();
    }
    return invoke(object, methodName, argTypes, args);
  }

  private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
    try {
      Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
      method.setAccessible(true);
      return method.invoke(object, args);
    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
      throw new RuntimeException("Incompatible KafkaProducer version", e);
    }
  }

  private static Object getValue(Object object, String fieldName) {
    return getValue(object, object.getClass(), fieldName);
  }

  private static Object getValue(Object object, Class<?> clazz, String fieldName) {
    try {
      Field field = clazz.getDeclaredField(fieldName);
      field.setAccessible(true);
      return field.get(object);
    } catch (NoSuchFieldException | IllegalAccessException e) {
      throw new RuntimeException("Incompatible KafkaProducer version", e);
    }
  }

  private static void setValue(Object object, String fieldName, Object value) {
    try {
      Field field = object.getClass().getDeclaredField(fieldName);
      field.setAccessible(true);
      field.set(object, value);
    } catch (NoSuchFieldException | IllegalAccessException e) {
      throw new RuntimeException("Incompatible KafkaProducer version", e);
    }
  }
}
